Large result set retrieval: streaming-only, all split types, query rewrite fix#128
Large result set retrieval: streaming-only, all split types, query rewrite fix#128
Conversation
…e APIs Implements three-tier companion-mode bulk retrieval for returning thousands to millions of rows from parquet-backed splits via Arrow FFI with bounded memory. Includes DocIdCollector (no-score search), fused single-call path, streaming session pipeline with adaptive I/O strategy, ARC_REGISTRY-based session handles for memory safety, and StreamingSession AutoCloseable Java wrapper with double-close protection. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Claude finished @schenksj's task —— View job Code Review CompleteReviewing PR #128 for:
Todo List
Summary: Found 4 Issues (1 Critical, 2 Moderate, 1 Minor)✅ Good Architecture Design
🔴 CRITICAL: Session Handle Memory LeakFile: ✅ RESOLVED - The 🟡 MODERATE: Integer Overflow RiskFile: let total: usize = segment_fruits.iter().map(|v| v.len()).sum();
let mut result = Vec::with_capacity(total);Issue: On very large result sets, Risk: Panic on allocation or incorrect capacity calculation. Recommendation: let total = segment_fruits.iter()
.map(|v| v.len())
.try_fold(0usize, |acc, len| acc.checked_add(len))
.ok_or_else(|| anyhow!("Result set too large: document count overflow"))?;🟡 MODERATE: Silent Error Swallowing in StreamFile: if tx.send(Ok(normalized)).await.is_err() {
perf_println!("⏱️ STREAMING: consumer dropped — stopping producer");
return Ok(()); // ← Silent success despite potential error
}Issue: Producer treats channel closure as success, potentially masking critical errors. Risk: Silent data loss if consumer fails early. Recommendation: Distinguish early failure from normal completion: if tx.send(Ok(normalized)).await.is_err() {
if rows_emitted == 0 {
return Err(anyhow!("Consumer dropped before any data sent - possible error"));
}
perf_println!("⏱️ STREAMING: consumer dropped after {} rows", rows_emitted);
return Ok(());
}🟢 MINOR: Missing Bounds CheckFile: let batches = read_parquet_batches_for_file(/* ... */);Issue: No validation that Risk: Potential panic on malformed input. Recommendation: Add bounds check: if file_idx >= manifest.parquet_files.len() {
return Err(anyhow!("Invalid file_idx {} >= {}", file_idx, manifest.parquet_files.len()));
}
let file_entry = &manifest.parquet_files[file_idx];✅ No Issues Found In:
Overall Assessment: Solid implementation with good architecture. The identified issues are edge cases that should be addressed for production robustness. |
…check 1. merge_fruits: use checked_add to prevent usize overflow on very large result sets instead of unchecked sum 2. Streaming producer: distinguish early consumer drop (rows_emitted==0, returns error) from normal mid-stream drop (returns Ok) 3. Streaming producer: validate file_idx bounds before indexing into manifest.parquet_files to prevent panic on malformed input Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Claude finished @schenksj's task —— View job Code Review CompleteReviewing PR #128 for:
Todo List
Summary: Found 1 Minor Issue✅ Good Architecture Design
🟢 MINOR: Missing Bounds Check in Arrow-to-Tantivy ModuleFile: let batches = read_parquet_batches_for_file(
file_idx,
&rows,
projected_fields.as_deref().map(|s| s as &[String]),
manifest,
storage,
metadata_cache,
byte_cache,
effective_coalesce,
).await?;Issue: While bounds checking exists at line 171-176, the Risk: Potential panic if the downstream function doesn't validate the file index. Recommendation: Verify that ✅ No Issues Found In:
Overall Assessment: Excellent implementation with solid architecture and comprehensive safety measures. The single minor issue is a best practice recommendation rather than a functional bug. |
…rite bug Three changes in this commit: 1. Remove fused (non-streaming) companion retrieval path entirely. The deprecated searchAndRetrieveArrowFfi() now delegates to the streaming session internally. Deletes fused_retrieval.rs and its JNI function. 2. Add Arrow FFI streaming for non-companion (regular tantivy) splits. New streaming_doc_retrieval.rs converts tantivy doc store documents to Arrow RecordBatches via doc_async(), reusing the same StreamingRetrievalSession infrastructure. The JNI function now routes companion splits through parquet streaming and regular splits through tantivy doc store streaming. 3. Fix streaming query rewrite bug (TANTIVY4JAVA_STREAMING_FAST_FIELD_BUG). perform_bulk_search() was missing the companion query rewrites that the regular search() path applies: FieldPresence → _phash_* rewrite and exact_only term → hash lookup rewrite. Without these, IS NOT NULL returned 0 rows and exact_only EqualTo returned nothing. Also adds ensure_fast_fields_for_query() call for parquet-transcoded fast fields. 4. Add regression tests for streaming + field presence, IS NULL, and exact_only queries comparing streaming results against search() baseline. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Claude finished @schenksj's task —— View job Claude is working on @schenksj's task Code Review CompleteReviewing PR #128 for:
Todo List
Summary: Found 1 Minor Issue✅ Good Architecture Design
🟢 MINOR: Missing Bounds Check in Arrow-to-Tantivy ModuleFile: let batches = read_parquet_batches_for_file(
file_idx,
&rows,
projected_fields.as_deref().map(|s| s as &[String]),
manifest,
storage,
metadata_cache,
byte_cache,
effective_coalesce,
).await?;Issue: While bounds checking exists at line 171-176, the Risk: Potential panic if the downstream function doesn't validate the file index. Recommendation: Verify that ✅ No Issues Found In:
Overall Assessment: Excellent implementation with solid architecture and comprehensive safety measures. The single minor issue is a best practice recommendation rather than a functional bug. |

Summary
startStreamingRetrieval()now works for both companion (parquet) and regular (tantivy doc store) splitssearchAndRetrieveArrowFfi()deprecated and reimplemented as streaming wrapper — single code path reduces bug surfacestreaming_doc_retrieval.rsconverts tantivy doc store documents to Arrow RecordBatches viadoc_async()with bounded memoryperform_bulk_search()now applies the same companion query rewrites as the regularsearch()path — fixes IS NOT NULL returning 0 rows and exact_only EqualTo returning nothingChanges since initial PR
fused_retrieval.rs— single streaming path for all result sizesstreaming_doc_retrieval.rs— non-companion split streaming via tantivy doc storebulk_retrieval.rs— addedrewrite_companion_query()for FieldPresence →_phash_*and exact_only → hash rewrites, plusensure_fast_fields_for_query()for parquet-transcoded fast fieldsStreamingRetrievalSession::new()constructor instreaming_ffi.rsnativeStartStreamingRetrievalnow routes companion vs regular splitsFiles changed
native/src/split_searcher/fused_retrieval.rs— DELETEDnative/src/split_searcher/streaming_doc_retrieval.rs— NEW (non-companion streaming)native/src/split_searcher/bulk_retrieval.rs— query rewrite fixnative/src/split_searcher/document_retrieval/doc_retrieval_jni.rs— dual-path routingnative/src/parquet_companion/streaming_ffi.rs— new constructornative/src/split_searcher/mod.rs— module registrationSplitSearcher.java— updated javadoc, deprecated wrapperLargeResultSetRetrievalTest.java— removed fused tests, added regression testsdocs/LARGE_RESULT_SET_DEVELOPER_GUIDE.md— updated architecture diagramTest plan
cargo checkpassescargo test --lib streaming_doc_retrieval— 4 unit tests passmvn test— 1261 tests, 0 failures, 0 errors🤖 Generated with Claude Code